/*
 *文件名： ExecMqtt.java
 *版权： Copyright by 云天励飞 intellif.com
 *描述： Description
 *创建人：mozping
 *创建时间： 2018/9/14 19:33
 *修改理由：
 *修改内容：
 */
package indi.mozping;

import indi.mozping.kafka.producer.MqttMessageGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 执行MQTT的类，程序启动后自动调用生产者方法生效mqtt消息
 *
 * @author mozping
 * @version 1.0
 * @date 2018/9/14 19:33
 * @see ExecMqtt
 * @since JDK1.8
 */
@Component
public class ExecMqtt implements CommandLineRunner {


    private static final Logger LOG = LoggerFactory.getLogger(ExecMqtt.class);

    @Autowired
    MqttMessageGateway mqttMessageGateway;

    @Value("${mqtt.topic:test-topic}")
    String topic;

    @Override
    public void run(String... args) throws Exception {
        int count = 0;
        while (true) {
            if (count == 3000) {
                return;
            }
            try {
                Message message = MessageBuilder.withPayload("mqtt test message" + new Date()).
                        setHeader(MqttHeaders.TOPIC, topic).build();
                LOG.info("Send to mqtt, message info is:" + message.getPayload().toString());
                mqttMessageGateway.sendMessage(message);
                Thread.sleep(5 * 1000);
                count++;
            } catch (InterruptedException e) {
                LOG.info("发送消息到mqtt异常...");
            }
        }
    }
}